package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * action操作实战
 */
public class ActionOperation {

    public static void main(String[] args) {
        // reduce();
        // collect();
        // count();
        // take();
        // saveAsTextFile();
        countByKey();
    }

    /**
     * reduce案例：1到10累加
     */
    private static void reduce() {
        // 创建SparkConf
        SparkConf conf = new SparkConf().setAppName("reduce").setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        JavaRDD<Integer> numbers = sc.parallelize(numberList);

        Integer count = numbers.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 +v2;
            }
        });

        System.out.println("1到10累加: " + count);

        // 关闭Java Spark Context
        sc.close();
    }

    /**
     * collect案例：每个元素乘以2
     */
    private static void collect() {
        // 创建SparkConf
        SparkConf conf = new SparkConf().setAppName("collect").setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        JavaRDD<Integer> numbers = sc.parallelize(numberList);

        JavaRDD doubleNumbers = numbers.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer * 2;
            }
        });

        List<Integer> doubleNumberList = doubleNumbers.collect();
        for (Integer o : doubleNumberList) {
            System.out.println(o);
        }

        // 关闭Java Spark Context
        sc.close();
    }

    /**
     * count案例：统计元素个数
     */
    private static void count() {
        // 创建SparkConf
        SparkConf conf = new SparkConf().setAppName("count").setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        JavaRDD<Integer> numbers = sc.parallelize(numberList);

        Long count = numbers.count();

        System.out.println(count);

        // 关闭Java Spark Context
        sc.close();
    }

    /**
     * take案例：获取前n个数据
     */
    private static void take() {
        // 创建SparkConf
        SparkConf conf = new SparkConf().setAppName("take").setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        JavaRDD<Integer> numbers = sc.parallelize(numberList);

        List<Integer> top3Numbers = numbers.take(3);

        for (Integer num : top3Numbers) {
            System.out.println(num);
        }

        // 关闭Java Spark Context
        sc.close();
    }

    /**
     * saveAsTextFile案例：保存数据到文件
     */
    private static void saveAsTextFile() {
        // 创建SparkConf
        SparkConf conf = new SparkConf().setAppName("saveAsTextFile");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        JavaRDD<Integer> numbers = sc.parallelize(numberList);

        JavaRDD doubleNumbers = numbers.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer * 2;
            }
        });

        doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_numbers");

        // 关闭Java Spark Context
        sc.close();
    }

    /**
     * countByKey案例：统计每个班级的人数
     */
    private static void countByKey() {
        // 创建SparkConf
        SparkConf conf = new SparkConf().setAppName("countByKey").setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 模拟集合
        List<Tuple2<String, String>> studentList = Arrays.asList(
                new Tuple2<String, String>("class1", "leo"),
                new Tuple2<String, String>("class2", "marry"),
                new Tuple2<String, String>("class1", "jack"),
                new Tuple2<String, String>("class2", "张三"),
                new Tuple2<String, String>("class2", "李四")
        );

        // 并行化集合 创建JavaPairRDD
        JavaPairRDD<String, String> students = sc.parallelizePairs(studentList);

        // 对students RDD 执行 countByKey算子，统计每个班级的人数
        Map<String, Object> studentCounts = students.countByKey();

        //打印 studentCount RDD
        for (Map.Entry<String, Object> entry : studentCounts.entrySet()) {
            System.out.println(entry.getKey() + ":" + entry.getValue());
        }

        // 关闭Java Spark Context
        sc.close();
    }

}
